Skip to content

Conversation

@erikmunson
Copy link

@erikmunson erikmunson commented Oct 6, 2025

working on a rough pass at #128

current approach is intended to require minimal refactoring while achieving the developer-facing API i think probably will work best. some tests are not working yet, i will work on getting existing tests to pass and then add some for this new behavior if things seem reasonable.

as i dug into how to make multi-topic consumption and rebalancing work, i adjusted the DX from my initial code example in #128. the idea is to implement a new method, consumeByPartition, that upon initial consume returns a map of topic-partition streams where the key is of the form topic:partition. after a rebalance/rejoin, IF the assignment has changed such that the downstream app code needs to account for added/removed partitions, then a required onAssignmentChange() callback is called with a new map containing the full set of topic-partition streams:

let streamMap = consumer.consumeByPartition({
  topics: ['foo'],
  onAssignmentChange: (newStreamMap) => {
    for (const [topicPartition, stream] of newStreamMap) {
      const existingStream = streamMap.get(topicPartition)

      if (!existingStream) {
        // spin up downstream work that consumes the newly assigned topic-partition stream
      }

      streamMap = newStreams
    }
  }
})

for (const [topicPartition, stream] of streamMap) {
  const [topic, partition] = topicPartition.split(':')

  // spin up downstream work that consumes each topic-partition stream after initial join
}

when the streams for individual topic-partitions detect that the consumer no longer has an assignment for their topic-partition after a join/rebalance, they automatically close themselves, so in most cases developers do not have to write bespoke logic for cleaning up removed partition streams. if their consuming app code handles the stream ending correctly, their code will do the right thing by default. the case that needs hand-written logic after a rebalance is creating new stream consumers in app code when new partitions are assigned. giving developers a full map of the new topic-partition assignments allows them to serve any advanced/unusual needs they may have around comparing old/new streams.

i originally considered adding a mode/config option to the existing consume() method, and also considered adding a config at the Consumer class level. but in both cases mixing the type signatures and logic between the two modes was complex, and i didn't feel that complexity was buying a whole lot in terms of DX. i also thing splitting it into a separate method has the nice side benefit of keeping the normal consume() api very easy to read in getting started examples so the library stays approachable, while allowing more advanced users to reach for this method if they actually need it.

the primary thing that i'd like to see improved about performance is that calls to listOffsets, listCommittedOffsets, and fetch all still operate at the individual stream level, which means that the number of requests to those operations now increases with the number of partitions instead of one-per-leader. I think to resolve that we'd have to move some of the logic for listing offsets/commits and doing fetching up above the level of individual MessageStreams and coordinate them more. Given that the main goal here is to decouple fetching backpressure between topic-partitions, we likely don't want to fully consolidate fetching together into one coordinated thing, but i can imagine there are better ways to do it than the naive way it works right now. listing offsets/commits only needs to happen on initial join and rebalancing so i think that one is the more important candidate for optimization since ideally we only make one call per rejoin.

@erikmunson
Copy link
Author

on the performance thing, i suppose it's possible we could do pause tracking using the internal state from #129, and move the #fetch() logic that's current in MessagesStream up into Consumer. If we did that, then each MessagesStream instance could flip itself between paused/resumed when it fills up and drains, and we'd go back to having one consolidated fetch across streams. that's a more invasive refactor though.

@mcollina
Copy link
Member

mcollina commented Oct 6, 2025

I'm not exactly convinced about the DX of onAssignmentChange and what happens to the current streams. I think a better approach would be to terminate/destroy all streams and have the users re-create them.

WDYT?

@erikmunson
Copy link
Author

def agree the current API is a little wonky, couple thoughts:

  • for streams where the consuming code is quick to spin up/down, and buffering is minimal (e.g. default of 1024), tearing down all the streams and recreating would be conceptually simpler without much downside. but i do worry a bit about the perf overhead of doing that for every rebalance in low latency systems where spinning up work for a partition takes significant time or the consumer loses a ton of buffered messages and needs to refetch all of them. for example in our indexing service we buffer up to ~100k messages per partition and do work with them in large batches, with a goal of well under 1s of end to end latency. i don't know exactly how long it would take to do a rebalance and then fully tear down and re-hydrate all of that state and buffer content — could be not too bad or could be seconds. goal of keeping existing streams alive was to create minimal spinup/teardown disruption on rebalance for perf. maybe we can just go with the destroy-and-recreate behavior initially since it's simpler, and if i see long rebalance times in production monitoring we can come up with a higher perf option.

  • if we did close all streams and re-create, what are you thinking of in terms of DX for picking up those new streams after a rebalance? are you expecting someone would re-call consumeByPartition after somehow detecting a rebalance or something different?

an additional issue i was just thinking about this morning was the MANUAL offset mode, where devs pass in offsets for partitions themselves... even in today's regular consume() api this actually doesn't really work with rebalances if app code is tracking offsets manually in some other system like redis — if another consumer owns a partition initially and then it gets reassigned to you after you already have a consume() going, it will use an old offset you probably don't want. and there is currently now equivalent to the seek() api in kafkajs that lets you move to a different offset once consumption has started. i'll think a bit more about that maybe we just need a seek() method and that problem goes away. i'll open a separate issue to figure that out. maybe if we have people re-calling consumeByPartition() on every rebalance they'd be able to supply new offsets every time as a way to resolve.

@ShogunPanda
Copy link
Contributor

About the DX I agree with @mcollina: let's destroy everything and create from scratch.
As a good DX I would use another type of stream itself so people have several ways to consume them.

Something like:

// The returning stream is in objectMode.
const perPartitionStreams = consumer.consumeByPartition({ topics: ['foo'] })

// A new one is 
for await (const streams of perPartitionStreams) {
  for(const {topic, partition, stream} of streams) {
    // Do your magic. The stream will automatically close when assignments change.
  }
}

Note that batching shouldn't be an issue as in-memory messages are still processed when the stream is gracefully closed.

About the MANUAL problem I agree with you and the seek method is probably a good idea, just pick a better and explanatory name like setOffsetsToFetch or something like that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants